Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8666] Use Stoppable Workers #408

Merged
merged 41 commits into from
Jan 28, 2025

Conversation

bashar-515
Copy link
Member

@bashar-515 bashar-515 commented Jan 22, 2025

WIP

This PR is a continuation of #402 re: this comment from @benjirewis (i.e., it updates rpc/wrtc_server.go, rpc/wrtc_call_queue_memory.go, and rpc/wrtc_call_queue_mongodb.go to use StoppableWorkers - and some other files as a side effect).

  • rpc/wrtc_server.go
  • rpc/wrtc_call_queue_memory.go
  • rpc/wrtc_call_queue_mongodb.go

@viambot viambot added the safe to test Mark as safe to test label Jan 22, 2025
}
select {
case <-cancelCtx.Done():
if ctx.Err() != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this check results in a deadlock/hang when running tests in the rpc package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'll still want to check context. This looks correct. Ticker case should be removed as you've done, too. Just see my comment about not needing the for loop surrounding this at all.

@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@@ -37,7 +37,7 @@ func newWebRTCServerChannel(
logger utils.ZapCompatibleLogger,
) *webrtcServerChannel {
base := newBaseChannel(
server.ctx,
server.processHeadersWorkers.Context(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An artifact of no longer storing a context.Context inside of the webrtcServer struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha; this makes sense to me. There's a hierarchy here of WebRTC server owns WebRTC server channels owns WebRTC server streams. We want the context cancelations to propagate in that order. So, I think what you have here is correct. Once Stop() is called on the WebRTC server, we should cancel all the way down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this will do the right thing. It would be more customary to just have newWebRTCServerChannel accept a context to bound its lifetime, but no need to change how things are structured right now.

if err := s.ch.server.ctx.Err(); err != nil {
s.ch.server.processHeadersWorkers.Done()
// in which case we return.
if err := s.ch.server.processHeadersWorkers.Context().Err(); err != nil {
Copy link
Member Author

@bashar-515 bashar-515 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also an artifact of removing the webrtcServer struct's context.Context. See above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgottlieb do you remember why we need this context check before even taking a ticket?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing anything arguing the ordering is important. I think the ordering was this was simply more to make the relationship between Adding and the context canceling explicit. Because Stop both cancels the context and waits on the waitgroup.

My expectation is that by moving to StoppableWorkers, we no longer need this if-block. StoppableWorkers does that for us before calling its worker function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool; that's my thought, too. Wanna remove this whole if block @bashar-515 ?

@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 22, 2025
@viambot viambot added the safe to test Mark as safe to test label Jan 24, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 24, 2025
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is pretty critical to the functioning of distributed signaling for Viam. I'm ok not even making changes to it, since we haven't yet seen any issues in its usage of wait groups/contexts. If you and @dgottlieb want to get stoppable workers into this file, though, then I think we should test a local version of app + MongoDB to ensure signaling still works. Can also probably do it with an app PR that creates a temporary staging environment.

if err := s.ch.server.ctx.Err(); err != nil {
s.ch.server.processHeadersWorkers.Done()
// in which case we return.
if err := s.ch.server.processHeadersWorkers.Context().Err(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgottlieb do you remember why we need this context check before even taking a ticket?

delete(hostQueue.activeOffers, offerID)
}
queue.activeBackgroundWorkers = utils.NewStoppableWorkerWithTicker(5*time.Second, func(ctx context.Context) {
if ctx.Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, this was necessary to exit the loop. But this is no longer serving a purpose. The stoppable workers caller checks the state/context before calling this method.

queue.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
queue.activeBackgroundWorkers.Done()
queue.activeBackgroundWorkers.Add(func(ctx context.Context) {
select {
case <-sendCtx.Done():
case <-ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer the same context. Before this ctx was the one passed into SendOfferInit. Now we've shadowed that with the activeBackgroundWorkers context. Which is already covered by the sendCtx.

Taking a step back. In the existing code, I'm not sure what the relationship is between the caller's context and the queue.cancelCtx. I think* the SendOfferInit ctx is the gRPC request's context. In which case we'd want to revert to using that one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch totally missed this


csStateUpdates: make(chan changeStreamStateUpdate),
callExchangeSubs: map[string]map[*mongodbCallExchange]struct{}{},
waitingForNewCallSubs: map[string]map[*mongodbNewCallEventHandler]struct{}{},
activeAnswerersfunc: &activeAnswerersfunc,
}

queue.activeBackgroundWorkers.Add(2)
// TODO(RSDK-8666): using StoppableWorkers is causing a data race
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we find any more detail about what was happening here?

defer queue.csManagerSeq.Add(1) // helpful on panicked restart
select {
case <-queue.cancelCtx.Done():
case <-queue.activeStoppableWorkers.Context().Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be replaced with the input ctx.

utils.ManagedGo(queue.operatorLivenessLoop, queue.activeBackgroundWorkers.Done)
utils.ManagedGo(queue.changeStreamManager, queue.activeBackgroundWorkers.Done)
// TODO(RSDK-8666): using StoppableWorkers is causing a data race
queue.activeStoppableWorkers.Add(func(ctx context.Context) { queue.operatorLivenessLoop() })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change the queue.operatorLivenessLoop and queue.changeStreamManager functions to accept the context. And change their select ... queue.activeStoppableWorkers.Context() to instead select on the function input ctx.

@@ -344,7 +339,7 @@ func (queue *mongoDBWebRTCCallQueue) operatorLivenessLoop() {
ticker := time.NewTicker(operatorStateUpdateInterval)
defer ticker.Stop()
for {
if !utils.SelectContextOrWaitChan(queue.cancelCtx, ticker.C) {
if !utils.SelectContextOrWaitChan(queue.activeStoppableWorkers.Context(), ticker.C) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the comment on having operatorLivenessLoop accepting a ctx, this is the line that caught my attention. We should just be checking the input context and not need to do this lookup on the stoppable workers itself.

@@ -500,11 +496,11 @@ func (queue *mongoDBWebRTCCallQueue) changeStreamManager() {
queue.csStateMu.Unlock()
activeHosts.Set(queue.operatorID, int64(len(hosts)))

nextCSCtx, nextCSCtxCancel := context.WithCancel(queue.cancelCtx)
nextCSCtx, nextCSCtxCancel := context.WithCancel(queue.activeStoppableWorkers.Context())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx

@@ -235,11 +230,11 @@ func NewMongoDBWebRTCCallQueue(
})
queue.subscriptionManager(newState.ChangeStream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also pass the ctx into subscriptionManager

@@ -677,14 +673,14 @@ func (queue *mongoDBWebRTCCallQueue) processNextSubscriptionEvent(next mongoutil
func (queue *mongoDBWebRTCCallQueue) subscriptionManager(currentCS <-chan mongoutils.ChangeEventResult) {
var waitForNextCS bool
for {
if queue.cancelCtx.Err() != nil {
if queue.activeStoppableWorkers.Context().Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per above, if we add in the ctx to this call, we can swap out accessing activeStoppableWorkers here.

@@ -889,7 +885,7 @@ func (queue *mongoDBWebRTCCallQueue) SendOfferInit(
sendCtx, sendCtxCancel := context.WithDeadline(ctx, offerDeadline)

// need to watch before insertion to avoid a race
sendAndQueueCtx, sendAndQueueCtxCancel := utils.MergeContext(sendCtx, queue.cancelCtx)
sendAndQueueCtx, sendAndQueueCtxCancel := utils.MergeContext(sendCtx, queue.activeStoppableWorkers.Context())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can keep this to minimize the changes as part of this refactor. But this feels clunky. We're just merging here to reduce the number of cases inside the below selects by one.

And I think there are only two select statements.

@@ -37,7 +37,7 @@ func newWebRTCServerChannel(
logger utils.ZapCompatibleLogger,
) *webrtcServerChannel {
base := newBaseChannel(
server.ctx,
server.processHeadersWorkers.Context(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this will do the right thing. It would be more customary to just have newWebRTCServerChannel accept a context to bound its lifetime, but no need to change how things are structured right now.

@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 28, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 28, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 28, 2025
@viambot viambot added safe to test Mark as safe to test and removed safe to test Mark as safe to test labels Jan 28, 2025
@bashar-515
Copy link
Member Author

No longer upgrading rpc/wrtc_call_queue_mongodb.go. Ticket tracking it here.

@bashar-515 bashar-515 merged commit cf6c56e into viamrobotics:main Jan 28, 2025
6 checks passed
@bashar-515 bashar-515 deleted the RSDK-8666-part-2 branch January 28, 2025 18:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe to test Mark as safe to test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants